iT邦幫忙

2024 iThome 鐵人賽

DAY 6
0
Software Development

用 NestJS 闖蕩微服務!系列 第 6

[用NestJS闖蕩微服務!] DAY06 - NATS Transporter

  • 分享至 

  • xImage
  •  

什麼是 NATS ?

NATS Logo

圖片來源

NATS 是一套輕量、安全且高效的訊息傳遞系統,經常用於微服務之間的溝通、物聯網設備的訊息傳遞、雲端原生(Cloud Native) 應用等。

NATS 採用 Publish/Subscribe 機制來交換訊息,Subscriber 會訂閱某個特定的 主題(Subject),當 Publisher 發送該 Subject 的訊息時,NATS 會將訊息轉發給 Subscriber。

NATS Publish/Subscribe Concept

Subject

NATS Subject 是 Publisher 與 Subscriber 溝通的介面,格式為 ASCII 編碼的字串,建議使用字元為 azAZ09。它具有 階層(Hierarchy) 的概念,使用 點(.) 來表示資訊層級。下方是幾個 NATS Subject 範例:

  • shop.order.created
  • shop.order.cancelled
  • shop.product.created
  • shop.product.updated

NATS Subject 的 Hierarchy 設計可以讓 Subscriber 透過 Wildcards 來訂閱符合條件的 Subject,大幅降低逐一訂閱的成本。下方是採用 Wildcards 的 Subject:

  • shop.*.created
  • shop.>

會發現有 *> 兩種特殊符號,它們的用途如下:

  • *:該字元的階層可以是任何字元,以 shop.*.created 來說,shop.order.createdshop.product.created 都符合篩選條件。
  • >:該字元的階層與下方所有階層可以是任何字元,以 shop.> 來說,上方四個範例的 Subject 都符合篩選條件。

補充:如果有看過前一篇介紹 MQTT 的話,會發現 Subject 的整體設計跟 MQTT Topic 大同小異,僅差在名詞與使用的字元、編碼不同,但概念基本上是差不多的。

Queue Groups

實務上經常會遇到需要水平擴展應用程式的情況,假如我們擴展了某個 Subscriber,此時會發生一個狀況,就是這些擴展出來的 Subscriber 會訂閱相同的 Subject,當 Publisher 發送訊息時,很可能導致 重工 的問題發生,所以 NATS 提供 佇列群組(Queue Groups) 的功能來實現 負載平衡(Load Balance)

Queue Groups 的運作原理很簡單,Subscriber 在訂閱的時候,告訴 NATS 自己屬於哪個 Queue Group,當 Publisher 發送訊息時,NATS 會隨機分配給其中一個 Subscriber。

Queue Groups Concept

Request/Reply

NATS 除了使用 Publish/Subscribe 的方式來交換訊息,它還提供了 Request/Reply 的機制。NATS 實現 Request/Reply 的方式也是基於 Publish/Subscribe,當 Subscriber 收到訊息時,會將要回覆的訊息發送到回覆用的 Subject,這個 Subject 稱為 inbox

NATS Request Reply Concept

QoS

NATS 有提供三種服務品質,分別是:最多傳輸一次至少傳輸一次確實傳輸一次。要採用某一種服務品質取決於使用的是 Core NATS 或是使用 NATS 的 JetStream 附加功能。

補充:如果有看過前一篇介紹 MQTT 的話,會發現 NATS 提供的三種 QoS 與 MQTT 定義的 QoS0、QoS1、QoS2 有相同的意義。

Core NATS

Core NATS 指的就是 NATS 本身,它並沒有持久性處理的功能,所以只提供最多傳輸一次的 QoS,假如 Subscriber 在 Publisher 發送訊息後才訂閱 Subject,就會發生 訊息遺失 的問題。

補充:雖然 Core NATS 不提供持久性處理,但可以運用 Request/Reply 機制讓開發者透過 Reply 來自行處理失敗等情況。

NATS JetStream

NATS JetStream 是內建於 NATS 中的附加功能,實現了持久性處理,提供至少傳輸一次及確實傳輸一次的 QoS。

補充:NATS JetStream 比起 Core NATS 有更多更複雜的概念,為了不讓本篇文的內容失焦,故不做進一步說明,有興趣可以參考官方文件

安裝 NATS

在終端機輸入下方指令以便從 Docker Hub 下載 NATS 的 Docker Image:

$ docker pull nats

在啟動 NATS 之前,需要預先準備可以操作 NATS 的環境,這邊推薦使用 nats-box 這個輕量級的 NATS 工具程式,它以 Docker Image 的方式提供給開發者使用,是非常方便的工具。透過下方指令從 Docker Hub 下載它的 Docker Image:

$ docker pull natsio/nats-box

NATS 跟 nats-box 都會以 Container 的方式運作,為了讓它們之間可以進行通訊,故需要建立一個 Docker Bridge Network 來建立通訊的橋樑:

$ docker network create <NETWORK_NAME>

接下來就可以架設 NATS 了,透過下方指令將 NATS 架設在 4222 port 並運行於 <NETWORK_NAME> 上:

$ docker run --name <NAME> --network <NETWORK_NAME> -p 4222:4222 -d nats

操作 NATS

在讓兩個服務透過 NATS 進行交換訊息之前,先試著用 nats-box 來操作。在終端機輸入下方指令啟動 nats-box 並將它運行於 <NETWORK_NAME> 上:

$ docker run --network <NETWORK_NAME> --rm -it natsio/nats-box

接著,開啟一個新的終端機並輸入下方指令來找出 NATS 的 IP 位址:

$ docker network inspect <NETWORK_NAME>

NATS IP

找到 IP 之後,回到啟動 nats-box 的終端機並使用下方指令訂閱 Subject 為 order.created 的訊息:

$ nats sub -s nats://<NATS_SERVER_IP>:4222 order.created

NATS Basic Subscription

接著,使用新的終端機啟動 nats-box 並輸入下方指令來發送 Subject 為 order.created 的訊息:

$ nats pub -s nats://<NATS_SERVER_IP>:4222 order.created Hello

NATS Basic Publisher

此時 Subscriber 會收到訊息:

NATS Basic Subscription Result

接著,來測試一下使用 Wildcards 進行訂閱,透過下方指令訂閱符合 shop.*.created 的 Subject:

$ nats sub -s nats://<NATS_SERVER_IP>:4222 shop.*.created

此時發送 Subject 為 shop.order.created 的訊息:

$ nats pub -s nats://<NATS_SERVER_IP>:4222 shop.order.created Hello

Subscriber 會順利收到發送的訊息:

NATS Subscriber Filter Result

最後來測試一下使用 Queue Group 負載平衡,在兩個終端機啟動 nats-box 並輸入下方指令訂閱 shop.order.created 的 Subject:

$ nats sub -s nats://<NATS_SERVER_IP>:4222 --queue <QUEUE_NAME> shop.order.created

此時發送 Subject 為 shop.order.created 的訊息:

$ nats pub -s nats://<NATS_SERVER_IP>:4222 shop.order.created Hello

只有一個 Subscriber 會收到發送的訊息:

NATS Queue Group Result

NATS Transporter

NestJS 實作了 NATS Transporter,讓微服務應用程式可以用跟其他 Transporter 一樣的開發風格來使用 NATS。

前置作業

要使用 NATS Transporter 之前需要先安裝下方套件:

$ npm install nats

補充:nats 是一套用於 Node.js 的 NATS 客戶端,有興趣可以參考 官方文件

建立微服務應用程式

修改載入點 main.ts 的內容,將 transport 設定為 Transport.NATS,並根據架設的 NATS 位址來設定 servers 的資訊:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.NATS,
      options: {
        servers: ['nats://localhost:4222'],
      }
    },
  );
  await app.listen();
}
bootstrap();

上方範例在 options 只使用了 servers,事實上,NATS Transporter 的 options 內容十分豐富,詳細設定可以參考 nats 官方文件

Transporter 訊息模式

NATS Transporter 也支援 Request-response 與 Event-based 訊息模式。下方是範例程式碼,在 AppController 內實作 sayHello 方法並套用 @MessagePattern 裝飾器,以及實作 onCreated 方法並套用 @EventPattern 裝飾器:

import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @MessagePattern({ cmd: 'hello' })
  sayHello(data: string) {
    console.log(data);
    return `Hello, ${data}`;
  }

  @EventPattern('shop.*.created')
  onCreated(item: { name: string; }) {
    console.log(item);
  }
}

使用 nats-box 來進行測試,透過下方指令發送 Subject 為 shop.order.created 的訊息:

$ nats pub -s nats://<NATS_SERVER_IP>:4222 shop.order.created '{"name":"noodle"}'

此時在微服務應用程式的終端機會顯示發送的訊息,因為 Subject 有符合訂閱的篩選條件:

NestJS NATS Transporter Test Result1

Pattern 設定為可序列化物件的話,要如何透過 nats-box 傳送訊息呢?這部分的處理方式與 Mosquitto Command 相同,將該可序列化物件轉成 JSON 字串即可,以上方程式碼來說,我們要將 { cmd: 'hello' } 轉成字串 '{"cmd":"hello"}' 當作 Subject 名稱:

$ nats request -s nats://<NATS_SERVER_IP>:4222 "{\"cmd\":\"hello\"}" HAO

補充request 指令會自動產生 inbox 並訂閱,當 Subscriber 透過該 inbox 回覆訊息時,終端機會出現該訊息。

此時會發現微服務應用程式的終端機顯示了錯誤訊息,原因是 NATS Transporter 在預設情況下,使用的是 natsJSONCodec 來反序列化訊息的,表示訊息必須要是 可反序列化的字串

NestJS NATS Transporter Test Result2

補充:從 NestJS 的原始碼即可得知,針對 NATS 的訊息會使用 natsJSONCodec 來反序列化訊息,有興趣的話可以參考這裡

如果我們期望在發送訊息時使用不可反序列化的字串並且在 Controller 拿到該訊息,如上方範例的操作方式,這時候要如何處理呢?我們需要自訂 Deserializer 來針對訊息做客製化的處理。下方是範例程式碼,新增一個 NatsCustomDeserializerclass 並實作 Deserializer 介面:

import { Deserializer } from '@nestjs/microservices';
import { NatsRequestJSONDeserializer } from '@nestjs/microservices/deserializers';
import { StringCodec, JSONCodec } from 'nats';

export class NatsCustomDeserializer implements Deserializer {
  // 實例化預設的 Deserializer
  private originDeserializer = new NatsRequestJSONDeserializer();

  deserialize(value: Uint8Array, options?: Record<string, any>) {
    try {
      return this.originDeserializer.deserialize(value, options);
    } catch (e) {
      const CUSTOM_VALUE_PROP_NAME = '__custom_value__';
      // 用 `StringCodec` 進行訊息解碼
      const decodedValue = StringCodec().decode(value);
      // 使用 `JSONCodec` 重新進行編碼
      const encodedValue = JSONCodec().encode({ [CUSTOM_VALUE_PROP_NAME]: decodedValue });
      const handledRequest = this.originDeserializer.deserialize(encodedValue, options);
      const customValue = handledRequest.data[CUSTOM_VALUE_PROP_NAME];
      // 覆寫 `data`,改成使用 `__custom_value__` 的資料,並添加 `id` 讓 Request-response 能順利運作
      return { ...handledRequest, data: customValue, id: crypto.randomUUID() };
    }
  }
}

預設情況下是使用 NatsRequestJSONDeserializer,這邊自訂的 Deserializer 設計思路是基於它來進行例外處理,假如無法成功反序列化,那就改成使用 natsStringCodec 來解碼,為了讓原本的流程可以順利運作,這裡自訂了一個 __custom_value__ 的屬性將本來的訊息做一層包裝,再將它帶給 NatsRequestJSONDeserializer 進行處理,處理完畢後,將本來的 data 屬性值進行覆寫,改成使用 data 裡面的 __custom_value__,這裡需要特別注意,需要在回傳時多帶 id,這樣 Request-response 才能正常運作。

實作完 NatsCustomDeserializer 後,要在 main.ts 套用它:

// ...
async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.NATS,
      options: {
        servers: ['nats://localhost:4222'],
        deserializer: new NatsCustomDeserializer(),
      }
    },
  );
  // ...
}
// ...

此時再次執行上方的指令,就會順利在微服務應用程式的終端機顯示訊息:

NestJS NATS Transporter Test Result3

而執行 nats-box 的終端機也會顯示收到的回覆訊息:

NestJS NATS Transporter Test Result4

取得 Payload 與 Context

假如要取得該請求的相關資訊,比如:Subject 名稱,可以透過 @Ctx 裝飾器取得 NatsContext。下方是範例程式碼:

import { Controller } from '@nestjs/common';
import {
  Ctx,
  Payload,
  EventPattern,
  NatsContext
} from '@nestjs/microservices';

@Controller()
export class AppController {
  @EventPattern('shop.*.created')
  onCreated(
    @Payload() item: { name: string; },
    @Ctx() ctx: NatsContext
  ) {
    console.log(ctx.getSubject());
    console.log(item);
  }
}

使用 nats-box 來進行測試,透過下方指令發送 Subject 為 shop.order.created 的訊息:

$ nats pub -s nats://<NATS_SERVER_IP>:4222 shop.order.created '{"name":"noodle"}'

此時在微服務應用程式的終端機會顯示發送的訊息以及 Subject 名稱:

NestJS NATS Transporter Test Context Result

建立客戶端

修改 AppModule 的內容,透過 ClientsModule 建立 NATS Transporter 的 ClientProxy

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
// ...

@Module({
  // ...
  imports: [
    ClientsModule.register([
      {
        name: 'NATS_SERVICE',
        transport: Transport.NATS,
        options: {
          servers: ['nats://localhost:4222'],
        }
      }
    ])
  ]
})
export class AppModule {}

傳送訊息

NATS Transporter 也支援 Request-response 與 Event-based 訊息模式,所以可以使用 ClientProxysendemit 方法。下方是範例程式碼,修改 AppController 的內容,使用 @Inject 裝飾器注入 ClientProxy,並設計 getHelloonOrderCreated 方法:

import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(
    @Inject('NATS_SERVICE') private readonly natsService: ClientProxy
  ) {}

  @Get()
  getHello() {
    return this.natsService.send({ cmd: 'hello' }, 'HAO');
  }

  @Get('orderCreated')
  onOrderCreated() {
    return this.natsService.emit('shop.order.created', { name: 'test' });
  }
}

透過 Postman 使用 GET 方法存取 http://localhost:3000,會看到下方的結果:

NestJS NATS Client Proxy Result1

透過 Postman 使用 GET 方法存取 http://localhost:3000/orderCreated,在微服務應用程式的終端機會看到 { name: 'test' }

NestJS NATS Client Proxy Result2

NATS Record Builder

在某些情境下,可能會運用 NATS Header 來傳遞一些資訊,針對這種情況,可以運用 NestJS 設計的 NatsRecordBuilder 來實現。以下方程式碼為例,透過 natsheaders 來建立 Header 資訊,這邊我們定義 x-version1,並經由 NatsRecordBuilder 把訊息與 Header 整理成 Record,最後將它當作資料讓 ClientProxy 來處理:

import { Inject, Controller, Get } from '@nestjs/common';
import { ClientProxy, NatsRecordBuilder } from '@nestjs/microservices';
import * as nats from 'nats';

@Controller()
export class AppController {
  constructor(
    @Inject('NATS_SERVICE') private readonly natsService: ClientProxy
  ) {}

  @Get('orderCreated')
  onOrderCreated() {
    const headers = nats.headers();
    headers.set('x-version', '1');
    const record = new NatsRecordBuilder({ name: 'test' })
      .setHeaders(headers)
      .build();
    return this.natsService.emit('shop.order.created', record);
  }
}

修改微服務應用程式的部分,在 AppControlleronCreated 方法裡透過 console.log 將 Header 資訊印出:

import { Controller } from '@nestjs/common';
import {
  Ctx,
  Payload,
  EventPattern,
  NatsContext
} from '@nestjs/microservices';
import { MsgHdrsImpl } from 'nats';

@Controller()
export class AppController {
  @EventPattern('shop.*.created')
  onCreated(
    @Payload() item: { name: string; },
    @Ctx() ctx: NatsContext
  ) {
    const headers: MsgHdrsImpl = ctx.getHeaders();
    const version = headers.get('x-version');
    console.log(version);
  }
}

透過 Postman 使用 GET 方法存取 http://localhost:3000/orderCreated,在微服務應用程式的終端機會看到 1

NestJS NATS Record Builder Result

小結

NATS 是一套輕量、安全且高效的訊息傳遞系統,它採用 Publish/Subscribe 模式,以 Subject 作為溝通的介面。NATS 不僅提供一般 Publish/Subscribe 的方式傳遞訊息,還提供了 Request/Reply 的機制,甚至還設計了 Queue Groups 來實現負載平衡,如果有需要還可以使用 NATS JetStream 來使用更可靠的 QoS。NATS 提供了一定的彈性讓開發者可以根據不同應用場景來調整設置,整體而言,是一個不錯的訊息傳遞系統。

NestJS 提供了 NATS Transporter 讓我們可以用跟其他 Transporter 一樣的開發風格來使用 NATS,針對要設置 NATS Header 的情況,NestJS 運用 Record Builder 這個角色的定位設計了 NatsRecordBuilder,讓開發者可以運用它來設置 NATS Header,如此一來,就不會破壞本來發送訊息的方式了。


上一篇
[用NestJS闖蕩微服務!] DAY05 - MQTT Transporter
下一篇
[用NestJS闖蕩微服務!] DAY07 - RabbitMQ Transporter
系列文
用 NestJS 闖蕩微服務!21
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言